Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make execution plan/blocklist aware of the memory ownership and who runs the plan #26650

Merged
merged 36 commits into from
Jul 22, 2022

Conversation

jianoaix
Copy link
Contributor

@jianoaix jianoaix commented Jul 17, 2022

Why are these changes needed?

Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing.

This is an alternative with better abstraction to #26196.

Note: this doesn't work for Dataset.split() yet, will do in a followup PR.

Related issue number

#25249

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@jianoaix jianoaix added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 17, 2022
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a lot more natural, though I'm wondering if we can improve the naming / concept a little bit more. The proposal seems OK to me though. What do others think?

python/ray/data/_internal/block_list.py Outdated Show resolved Hide resolved
python/ray/data/_internal/plan.py Outdated Show resolved Hide resolved
Ubuntu and others added 20 commits July 18, 2022 16:59
Signed-off-by: Ubuntu <[email protected]>
My experiments (the script
) showed that dataset.split_at_indices() with SPREAD tasks have more predictable performance

Concretely: on 10 m5.4xlarge nodes with 5000 iops disk
calling ds.split_at_indices(81) on 200GB dataset with 400 blocks: the split_at_indices without this PR takes 7-19 seconds, split_at_indices with SPREAD takes 7-12 seconds.

Signed-off-by: Ubuntu <[email protected]>
Co-authored-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
…ject#26094)

Co-authored-by: Eric Liang <[email protected]>
Co-authored-by: matthewdeng <[email protected]>
Co-authored-by: Matthew Deng <[email protected]>
Co-authored-by: Richard Liaw <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
…ints (ray-project#26641)

This PR replaces dataset.split(.., equal=True) implementation by dataset.split_at_indices() . My experiments (the script
) showed that dataset.split_at_indices() have more predictable performance than the dataset.split(…)

Concretely: on 10 m5.4xlarge nodes with 5000 iops disk

calling ds.split(81) on 200GB dataset with 400 blocks: the split takes 20-40 seconds, split_at_indices takes ~12 seconds.

calling ds.split(163) on 200GB dataset with 400 blocks, the split takes 40-100 seconds, split_at_indices takes ~24 seconds.

I don’t have much insight of dataset.split implementation, but with dataset.split_at_indices() we are just doing SPREAD to num_split_at_indices tasks, which yield much stable performance.

Note: clean up the usage of experimental locality_hints in ray-project#26647
Signed-off-by: Ubuntu <[email protected]>
Why are these changes needed?
Since locality_hints is an experimental feature, we stop promoting it in doc and don't enable it in AIR. See ray-project#26641 for more context

Signed-off-by: Ubuntu <[email protected]>
- Stop using dot command to run ci.sh script: it doesn't fail the build if the command fails for windows and is generally dangerous since it will make unexpected changes to the current shell.
- Fix uncovered windows build issues.

Signed-off-by: Ubuntu <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
Signed-off-by: Ubuntu <[email protected]>
python/ray/data/_internal/block_list.py Outdated Show resolved Hide resolved
python/ray/data/_internal/block_list.py Outdated Show resolved Hide resolved
python/ray/data/_internal/lazy_block_list.py Outdated Show resolved Hide resolved
python/ray/data/_internal/plan.py Outdated Show resolved Hide resolved
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 19, 2022
python/ray/data/_internal/block_list.py Outdated Show resolved Hide resolved
python/ray/data/_internal/plan.py Outdated Show resolved Hide resolved
@jianoaix
Copy link
Contributor Author

@ericl @clarkzinzow For "run_by_consumer" that's currently attached to each stage, I'm thinking it makes more sense to be attached to ExecutionPlan instead, since that's the actual unit run by the consumer. Each individual stage can derive that from the ExecutionPlan it belongs to. I'll make it so if this sounds good to you.

assert len(blocks) == len(metadata), (blocks, metadata)
self._blocks: List[ObjectRef[Block]] = blocks
self._num_blocks = len(self._blocks)
self._metadata: List[BlockMetadata] = metadata
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we clearly define what is consumer in one place, e.g. dataset.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
*,
owned_by_consumer: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the idea that if the blocks are owned_by_consumer, then we can eagerly clean them up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, once it's consumed, the blocklist can be cleared.

@@ -1058,16 +1058,20 @@ def equalize(splits: List[Dataset[T]], num_splits: int) -> List[Dataset[T]]:

block_refs, metadata = zip(*blocks.get_blocks_with_metadata())
metadata_mapping = {b: m for b, m in zip(block_refs, metadata)}
owned_by_consumer = blocks._owned_by_consumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I don't see iter_batches() set run_by_consumer, or any other consumer APIs (such as show(), take(), write_xxx). Am I missing anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do this for Dataset yet, for now it's unchanged.

@c21
Copy link
Contributor

c21 commented Jul 21, 2022

I am wondering in which cases we do need eager execution for now? Can we remove support for eager execution post 2.0, and support lazy execution only? That would make code structure much simpler I guess. In that case, only the input and output blocks of execution plan needs be preserved, and all intermediate blocks can be cleaned up.

We could introduce another dataset persist()/cache() API to allow user run the execution plan and save the output blocks, in case they need.

@jianoaix
Copy link
Contributor Author

I am wondering in which cases we do need eager execution for now? Can we remove support for eager execution post 2.0, and support lazy execution only? That would make code structure much simpler I guess. In that case, only the input and output blocks of execution plan needs be preserved, and all intermediate blocks can be cleaned up.

We could introduce another dataset persist()/cache() API to allow user run the execution plan and save the output blocks, in case they need.

Currently in call cases (except read) it's eager. I think lazy only (not just lazy by default) may make sense, just like DatasetPipeline. In that case, we can remove the "run_by_consumer", because it's always run by consumer.
We actually can also remove input (if it's recomputable, like read from external storage or computed via math operators like range()), and also output (e.g. in DatasetPipeline, we delete output once it's consumed).
We may just keep those pinned in memory, including in-memory data used to create initial input blocks, and those explicitly pinned. Today we have an API fully_executed(), which could serve similar purpose as cache API as you mentioned. In lazy only world, each consumption/action is recomputed from pinned memory or external storage.

@jianoaix jianoaix changed the title Make execution stage/blocklist aware if it's run by Dataset or Datase… Make execution plan/blocklist aware of the memory ownership and who runs the plan Jul 22, 2022
@jianoaix jianoaix removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 22, 2022
@ericl ericl merged commit 8553df4 into ray-project:master Jul 22, 2022
Rohan138 pushed a commit to Rohan138/ray that referenced this pull request Jul 28, 2022
…uns the plan (ray-project#26650)

Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing.

This is an alternative with better abstraction to ray-project#26196.

Note: this doesn't work for Dataset.split() yet, will do in a followup PR.
Signed-off-by: Rohan138 <[email protected]>
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
…uns the plan (ray-project#26650)

Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing.

This is an alternative with better abstraction to ray-project#26196.

Note: this doesn't work for Dataset.split() yet, will do in a followup PR.
Signed-off-by: Stefan van der Kleij <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.